#!/bin/bash

set -o errexit

test_dir=$(realpath $(dirname $0))
. ${test_dir}/../functions

set_debug

GTID_PATTERN='[A-F0-9a-f]{8}-[A-F0-9a-f]{4}-[A-F0-9a-f]{4}-[A-F0-9a-f]{4}-[A-F0-9a-f]{12}:[0-9]+'

if [[ $IMAGE_PXC =~ 5\.7 ]]; then
	echo "Skipping PITR test because 5.7 doesn't support it!"
	exit 1
fi

run_recovery_check_pitr() {
	local cluster=$1
	local restore=$2
	local backup=$3
	local compare=$4
	local time_now=$5
	local dest=$6
	local gtid=$7

	desc 'recover backup' $restore
	cat "$test_dir/conf/${restore}.yaml" \
		| $sed -e "s/<datetime>/${time_now}/g" \
		| $sed -e "s/<destination>/${dest}/g" \
		| $sed -e "s/<gtid>/${gtid}/g" \
		| kubectl_bin apply -f -
	wait_backup_restore ${backup}
	kubectl_bin logs job/restore-job-${backup}-${cluster}
	wait_for_running "$cluster-proxysql" 2
	wait_for_running "$cluster-pxc" 3
	wait_cluster_consistency "$cluster" 3 2
	desc 'check data after backup' $restore
	compare_mysql_cmd $compare "SELECT * from test.test;" "-h $cluster-pxc-0.$cluster-pxc -uroot -proot_password"
	compare_mysql_cmd $compare "SELECT * from test.test;" "-h $cluster-pxc-1.$cluster-pxc -uroot -proot_password"
	compare_mysql_cmd $compare "SELECT * from test.test;" "-h $cluster-pxc-2.$cluster-pxc -uroot -proot_password"
	kubectl_bin delete -f "$test_dir/conf/${restore}.yaml"
}

write_test_data() {
	local cluster=$1
	local config=$2
	local size="${3:-3}"
	local sleep="${4:-10}"
	local secretsFile="${5:-$conf_dir/secrets.yml}"
	local pxcClientFile="${6:-$conf_dir/client.yml}"

	local proxy=$(get_proxy "$cluster")

	desc 'write test data'
	if [[ $IMAGE_PXC =~ 5\.7 ]] && [[ "$(is_keyring_plugin_in_use "$cluster")" ]]; then
		encrypt='ENCRYPTION=\"Y\"'
	fi
	run_mysql \
		"CREATE DATABASE IF NOT EXISTS test; use test; CREATE TABLE IF NOT EXISTS test (id int PRIMARY KEY) $encrypt;" \
		"-h $proxy -uroot -proot_password"
	run_mysql \
		'INSERT test.test (id) VALUES (100500); INSERT test.test (id) VALUES (100501); INSERT test.test (id) VALUES (100502);' \
		"-h $proxy -uroot -proot_password"
	sleep 30
	for i in $(seq 0 $((size - 1))); do
		compare_mysql_cmd "select-3" "SELECT * from test.test;" "-h $cluster-pxc-$i.$cluster-pxc -uroot -proot_password"
	done

	if [ "$(is_keyring_plugin_in_use "$cluster")" ]; then
		table_must_be_encrypted "$cluster" "test"
	fi
}

write_data_for_pitr() {
	local cluster=$1
	local proxy=$(get_proxy "$cluster")

	desc "write data for pitr"
	run_mysql \
		'INSERT test.test (id) VALUES (100503); INSERT test.test (id) VALUES (100504); INSERT test.test (id) VALUES (100505);' \
		"-h $proxy -uroot -proot_password"
}

write_more_data() {
	local cluster=$1
	local proxy=$(get_proxy "$cluster")
	desc "write extra data"
	run_mysql \
		'INSERT test.test (id) VALUES (100506); INSERT test.test (id) VALUES (100507); INSERT test.test (id) VALUES (100508); INSERT test.test (id) VALUES (100509); INSERT test.test (id) VALUES (100510);' \
		"-h $proxy -uroot -proot_password"
}

create_binlog_gap() {
	desc 'create binlog gap'

	kubectl patch pxc pitr --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":false}}}}'
	sleep 5 # wait for pitr pod to shutdown
	# write data which will be lost
	run_mysql \
		'INSERT test.gap (id) VALUES (100800); INSERT test.gap (id) VALUES (100801); INSERT test.gap (id) VALUES (100802);' \
		"-h $proxy -uroot -proot_password"
	# flush binlogs
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	# purge binlogs
	run_mysql_local 'PURGE BINARY LOGS BEFORE now();' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'PURGE BINARY LOGS BEFORE now();' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'PURGE BINARY LOGS BEFORE now();' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	# re-enable pitr
	kubectl patch pxc pitr --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":true}}}}'
	# flush binlogs to trigger binlog collector
	# data below will be force recovered
	run_mysql \
		'INSERT test.gap (id) VALUES (100803); INSERT test.gap (id) VALUES (100804); INSERT test.gap (id) VALUES (100805);' \
		"-h pitr-proxysql -uroot -proot_password"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-0"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-1"
	run_mysql_local 'FLUSH BINARY LOGS;' "-h127.0.0.1 -P3306 -uroot -proot_password" "$cluster-pxc-2"
	sleep 65 # wait for next PITR collect cycle and error to appear
}

check_binlog_gap_error() {
	desc 'check binlog gap error'

	# check error in pitr log
	local err_text1=$(kubectl_bin logs $(get_pitr_pod) | grep -c "ERROR: Couldn't find the binlog that contains GTID set")
	local err_text2=$(kubectl_bin logs $(get_pitr_pod) | grep -c "ERROR: Gap detected in the binary logs. Binary logs will be uploaded anyway, but full backup needed for consistent recovery.")
	if [[ $err_text1 -eq 0 || $err_text2 -eq 0 ]]; then
		echo "ERROR: Gap error text is not found in PITR pod logs."
		exit 1
	fi
	# check error in operator log
	local err_text3=$(kubectl_bin logs ${OPERATOR_NS:+-n$OPERATOR_NS} $(get_operator_pod) | grep -c "Gap detected in binary logs")
	if [[ $err_text3 -eq 0 ]]; then
		echo "ERROR: Gap error text is not found in operator pod logs."
		exit 1
	fi
	# check backup on-pitr-minio-gap marked as unready for PITR restore
	local backup_cond=$(kubectl_bin get pxc-backup on-pitr-minio-gap -ojsonpath='{.status.conditions[]}' | grep -c '"reason":"BinlogGapDetected","status":"False","type":"PITRReady"')
	if [[ $backup_cond -eq 0 ]]; then
		echo "ERROR: Backup is not tagged as PITR unready in the backup condition."
		kubectl_bin get pxc-backup on-pitr-minio-gap -oyaml
		exit 1
	fi
}

check_binlog_gap_restore() {
	local type=$1

	desc 'check binlog gap restore: ' $type
	# disable pitr
	kubectl patch pxc pitr --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":false}}}}'
	# try restore, check error
	if [ "$type" == "error" ]; then
		kubectl_bin apply -f $test_dir/conf/restore-on-pitr-minio-gap-error.yaml
		wait_backup_restore "on-pitr-minio-gap-error" "Failed"
		local backup_error=$(kubectl_bin get pxc-restore on-pitr-minio-gap-error -ojsonpath='{.status.comments}' | grep -c "Backup doesn't guarantee consistent recovery with PITR. Annotate PerconaXtraDBClusterRestore with percona.com/unsafe-pitr to force it.")
		if [[ $backup_error -eq 0 ]]; then
			echo "ERROR: Backup is not tagged as PITR unready in the backup condition."
			kubectl_bin get pxc-backup on-pitr-minio-gap -oyaml
			exit 1
		fi
		kubectl_bin delete -f "$test_dir/conf/restore-on-pitr-minio-gap-error.yaml"
	elif [ "$type" == "force" ]; then
		kubectl_bin apply -f "$test_dir/conf/restore-on-pitr-minio-gap-force.yaml"
		wait_backup_restore "on-pitr-minio-gap-force" "Succeeded"
		wait_for_running "$cluster-proxysql" 2
		wait_for_running "$cluster-pxc" 3
		wait_cluster_consistency "$cluster" 3 2
		kubectl_bin logs job/restore-job-on-pitr-minio-gap-force-${cluster}
		compare_mysql_cmd "select-gap" "SELECT * from test.gap;" "-h $cluster-pxc-0.$cluster-pxc -uroot -proot_password"
		compare_mysql_cmd "select-gap" "SELECT * from test.gap;" "-h $cluster-pxc-1.$cluster-pxc -uroot -proot_password"
		compare_mysql_cmd "select-gap" "SELECT * from test.gap;" "-h $cluster-pxc-2.$cluster-pxc -uroot -proot_password"
		kubectl_bin delete -f "$test_dir/conf/restore-on-pitr-minio-gap-force.yaml"
	else
		echo "Wrong restore type!"
		exit 1
	fi
}

main() {
	create_infra $namespace
	deploy_cert_manager
	kubectl_bin apply -f "$test_dir/conf/issuer.yml"
	kubectl_bin apply -f "$test_dir/conf/cert.yml"
	sleep 25
	# We are using minio with tls enabled to check if `verifyTLS: false` works fine
	start_minio "tls-minio"

	cluster="pitr"
	spinup_pxc "$cluster" "$test_dir/conf/$cluster.yml"

	run_backup "$cluster" "on-pitr-minio"

	write_test_data "$cluster"

	desc 'show binlog events'
	proxy=$(get_proxy "$cluster")
	run_mysql "SHOW BINLOG EVENTS IN 'binlog.000005';" "-h ${proxy} -uroot -proot_password"
	run_mysql "SHOW BINLOG EVENTS IN 'binlog.000006';" "-h ${proxy} -uroot -proot_password"

	time_now=$(run_mysql "SELECT now();" "-h ${proxy} -uroot -proot_password")
	gtid=$(run_mysql "SELECT @@gtid_executed;" "-h ${proxy} -uroot -proot_password" | $sed 's/^\(.*\):[0-9]-\([0-9]*\)/\1:\2/')

	if [[ ! ${gtid} =~ ${GTID_PATTERN} ]]; then
		printf "Some garbage --> %s <-- instead of legit GTID. Exiting" ${gtid}
		exit 1
	fi

	write_data_for_pitr "$cluster"
	sleep 120 # need to wait while collector catch new data

	timeout=60
	binlogs_exist=0
	for i in $(seq 2 3); do
		binlogs_exist=$(
			kubectl_bin run -n "${NAMESPACE}" -i --rm aws-cli --image=perconalab/awscli --restart=Never -- \
				/usr/bin/env AWS_ACCESS_KEY_ID=some-access-key AWS_SECRET_ACCESS_KEY=some-secret-key AWS_DEFAULT_REGION=us-east-1 \
				/usr/bin/aws --endpoint-url https://minio-service:9000 --no-verify-ssl s3 ls operator-testing/binlogs/ | grep -c "binlog" | cat
			exit "${PIPESTATUS[0]}"
		)
		if [ "$binlogs_exist" -eq 0 ]; then
			sleep "$((timeout * i))"
		fi
	done

	if [ "$binlogs_exist" -eq 0 ]; then
		echo "Binlogs are not found in S3"
		exit 1
	fi

	run_recovery_check_pitr "$cluster" "restore-on-pitr-minio-gtid" "on-pitr-minio" "select-2" "" "" "$gtid"
	desc "done gtid type"

	run_recovery_check_pitr "$cluster" "restore-on-pitr-minio-time" "on-pitr-minio" "select-3" "$time_now" "" ""
	desc "done date type"

	dest=$(sed 's,/,\\/,g' <<<$(kubectl get pxc-backup on-pitr-minio -o jsonpath='{.status.destination}'))
	run_recovery_check_pitr "$cluster" "restore-on-pitr-minio" "on-pitr-minio" "select-4" "" "$dest" ""
	desc "done latest type"

	desc 'check second backup/restore data from binlogs'
	kubectl patch pxc pitr --type=merge -p '{"spec":{"backup":{"pitr":{"enabled":true}}}}'
	sleep 20
	run_backup "$cluster" "on-pitr-minio1"
	write_more_data "$cluster"
	dest=$(sed 's,/,\\/,g' <<<$(kubectl get pxc-backup on-pitr-minio1 -o jsonpath='{.status.destination}'))
	sleep 80 # need to wait while collector catch new data
	run_recovery_check_pitr "$cluster" "restore-on-pitr-minio1" "on-pitr-minio1" "select-5" "" "$dest" ""

	desc 'binlog gap test'
	desc 'create binlog gap backup (will be marked as PITR unready)'
	run_mysql \
		"CREATE DATABASE IF NOT EXISTS test; use test; CREATE TABLE IF NOT EXISTS gap (id int PRIMARY KEY);" \
		"-h $proxy -uroot -proot_password"
	run_backup "$cluster" "on-pitr-minio-gap"
	create_binlog_gap
	check_binlog_gap_error
	check_binlog_gap_restore "error"
	check_binlog_gap_restore "force"
	desc "done binlog gap test"

	destroy $namespace
	desc "test passed"
}

main
